package com.uyuni.rpc.client.provider;

import com.uyuni.rpc.client.provider.model.DefaultProviderInactiveProcessor;
import com.uyuni.rpc.common.exception.remoting.RemotingException;
import com.uyuni.rpc.common.protocol.UyuniProtocol;
import com.uyuni.rpc.common.transport.body.PublishServiceCustomBody;
import com.uyuni.rpc.common.utils.NamedThreadFactory;
import com.uyuni.rpc.transport.model.RemotingTransporter;
import com.uyuni.rpc.transport.netty.NettyClientConfig;
import com.uyuni.rpc.transport.netty.NettyRemotingClient;
import com.uyuni.rpc.transport.netty.NettyRemotingServer;
import com.uyuni.rpc.transport.netty.NettyServerConfig;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.*;

public class DefaultProvider implements Provider {

    private static final Logger logger = LoggerFactory.getLogger(DefaultProvider.class);

    private NettyClientConfig clientConfig;               // 向注册中心连接的netty client配置
    private NettyServerConfig serverConfig;              // 等待服务提供者连接的netty server的配置
    private NettyRemotingClient nettyRemotingClient;      // 连接monitor和注册中心
    private NettyRemotingServer nettyRemotingServer;      // 等待被Consumer连接
    private NettyRemotingServer nettyRemotingVipServer;   // 等待被Consumer VIP连接
    private ProviderRegistryController providerRegistryController;// provider端向注册中心连接的业务逻辑的控制器
    private ProviderRPCController providerRPCController;  // consumer端远程调用的核心控制器

    private ExecutorService remotingExecutor;             // RPC调用的核心线程执行器
    private ExecutorService remotingVipExecutor;          // RPC调用VIP的核心线程执行器

    // 定时任务执行器
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("provider-timer"));

    /************ 全局发布的信息 ************/
    private ConcurrentMap<String, PublishServiceCustomBody> globalPublishService = new ConcurrentHashMap<>();

    /***** 注册中心的地址 ******/
    private String registryAddress;
    /******* 服务暴露给consumer的地址 ********/
    private int exposePort;
    /************* 监控中心的monitor的地址 *****************/
    private String monitorAddress;

    /********* 要发布的服务的信息 ***********/
    private List<RemotingTransporter> publishRemotingTransporters;

    /*********** 要提供的服务 ***************/
    private Object[] obj;

    // 当前provider端状态是否健康，也就是说如果注册宕机后，该provider端的实例信息是失效，这是需要重新发送注册信息,因为默认状态下start就是发送，只有channel
    // inactive的时候说明短线了，需要重新发布信息
    private boolean providerStateIsHealthy = true;

    public DefaultProvider() {
        this.clientConfig = new NettyClientConfig();
        this.serverConfig = new NettyServerConfig();
        providerRegistryController = new ProviderRegistryController(this);
        providerRPCController = new ProviderRPCController(this);
        initialize();
    }

    public DefaultProvider(NettyClientConfig clientConfig, NettyServerConfig serverConfig) {
        this.clientConfig = clientConfig;
        this.serverConfig = serverConfig;
        providerRegistryController = new ProviderRegistryController(this);
        providerRPCController = new ProviderRPCController(this);
        initialize();
    }

    private void initialize() {

        this.nettyRemotingServer = new NettyRemotingServer(this.serverConfig);
        this.nettyRemotingClient = new NettyRemotingClient(this.clientConfig);
        this.nettyRemotingVipServer = new NettyRemotingServer(this.serverConfig);

        this.remotingExecutor = Executors.newFixedThreadPool(serverConfig.getServerWorkerThreads(), new NamedThreadFactory("providerExecutorThread_"));
        this.remotingVipExecutor = Executors.newFixedThreadPool(serverConfig.getServerWorkerThreads() / 2, new NamedThreadFactory("providerExecutorThread_"));
        // 注册处理器
        this.registerProcessor();

        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            // 延迟5秒，每隔60秒开始 像其发送注册服务信息
            try {
                logger.info("schedule check publish service");
                if (!providerStateIsHealthy) {
                    logger.info("channel which connected to registry,has been inactived,need to republish service");
                    this.publishedAndStartProvider();
                }
            } catch (Exception e) {
                logger.warn("schedule publish failed [{}]", e.getMessage());
            }
        }, 60, 60, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                logger.info("ready send message");
                this.providerRegistryController.getRegistryController().checkPublishFailMessage();
            } catch (InterruptedException | RemotingException e) {
                logger.warn("schedule republish failed [{}]", e.getMessage());
            }
        }, 1, 1, TimeUnit.MINUTES);

        //清理所有的服务的单位时间的失效过期的统计信息
        this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            logger.info("ready prepare send Report");
            this.providerRegistryController.getServiceFlowControllerManager().clearAllServiceNextMinuteCallCount();
        }, 5, 45, TimeUnit.SECONDS);

        // 如果监控中心的地址不是null，则需要定时发送统计信息
        //this.scheduledExecutorService.scheduleAtFixedRate(() -> DefaultProvider.this.providerController.getProviderMonitorController().sendMetricsInfo(), 5, 60, TimeUnit.SECONDS);

        //每隔60s去校验与monitor端的channel是否健康，如果不健康，或者inactive的时候，则重新去链接
        /*this.scheduledExecutorService.scheduleAtFixedRate(() -> {
            try {
                DefaultProvider.this.providerRegistryController.getProviderMonitorController().checkMonitorChannel();
            } catch (InterruptedException e) {
                logger.warn("schedule check monitor channel failed [{}]", e.getMessage());
            }
        }, 30, 60, TimeUnit.SECONDS);*/


        //检查是否有服务需要自动降级
        this.scheduledExecutorService.scheduleAtFixedRate(() -> DefaultProvider.this.providerRegistryController.checkAutoDegrade(), 30, 60, TimeUnit.SECONDS);
    }

    private void registerProcessor() {
        DefaultProviderRegistryProcessor defaultProviderRegistryProcessor = new DefaultProviderRegistryProcessor(this);
        // provider端作为client端去连接registry注册中心的处理器
        this.nettyRemotingClient.registerProcessor(UyuniProtocol.DEGRADE_SERVICE, defaultProviderRegistryProcessor, null);
        this.nettyRemotingClient.registerProcessor(UyuniProtocol.AUTO_DEGRADE_SERVICE, defaultProviderRegistryProcessor, null);
        // provider端连接registry链接inactive的时候要进行的操作(设置registry的状态为不健康，告之registry重新发送服务注册信息)
        this.nettyRemotingClient.registerChannelInactiveProcessor(new DefaultProviderInactiveProcessor(this), null);
        // provider端作为netty的server端去等待调用者连接的处理器，此处理器只处理RPC请求
        this.nettyRemotingServer.registerDefaultProcessor(new DefaultProviderRPCProcessor(this), this.remotingExecutor);
        this.nettyRemotingVipServer.registerDefaultProcessor(new DefaultProviderRPCProcessor(this), this.remotingVipExecutor);
    }


    @Override
    public void start() throws InterruptedException, RemotingException {
        logger.info("######### provider starting... #########");
        // 编织服务 返回赋值给全局变量
        this.publishRemotingTransporters = providerRegistryController.getLocalServerWrapperManager().wrapperRegisterInfo(this.getExposePort(), this.obj);
        logger.info("registry center address [{}] servicePort [{}] service [{}]", this.registryAddress, this.exposePort, this.publishRemotingTransporters);
        //全局记录发布信息
        initGlobalService();
        //启动netty客户端
        nettyRemotingClient.start();
        try {
            // 发布任务
            this.publishedAndStartProvider();
            logger.info("######### provider start successfully..... ########");
        } catch (Exception e) {
            logger.error("publish service to registry failed [{}]", e.getMessage());
        }

        int _port = this.exposePort;
        if (_port != 0) {
            this.serverConfig.setListenPort(exposePort);
            this.nettyRemotingServer.start();
            int vipPort = _port - 2;
            this.serverConfig.setListenPort(vipPort);
            this.nettyRemotingVipServer.start();
        }
    }

    private void initGlobalService() {
        List<RemotingTransporter> list = this.publishRemotingTransporters; // Stack
        if (list == null) {
            return;
        }
        list.forEach(transporter -> {
            PublishServiceCustomBody customBody = (PublishServiceCustomBody) transporter.getCustomBody();
            String serviceName = customBody.getServiceProviderName();
            this.globalPublishService.put(serviceName, customBody);
        });
    }

    /**
     * 处理用户发送过来的降级请求
     *
     * @param request
     * @param channel
     * @param degradeService
     * @return
     */
    public RemotingTransporter handlerDegradeServiceRequest(RemotingTransporter request, Channel channel, byte degradeService) {
        return null;
    }

    @Override
    public void publishedAndStartProvider() throws InterruptedException, RemotingException {
        logger.info("publish service....");
        providerRegistryController.getRegistryController().publishedAndStartProvider();
        // 发布之后再次将服务状态改成true
        providerStateIsHealthy = true;
    }

    @Override
    public Provider serviceListenPort(int port) {
        this.exposePort = port;
        return this;
    }

    @Override
    public Provider registryAddress(String registryAddress) {
        this.registryAddress = registryAddress;
        return this;
    }

    @Override
    public Provider monitorAddress(String monitorAddress) {
        this.monitorAddress = monitorAddress;
        return this;
    }

    @Override
    public Provider publishService(Object... obj) {
        this.obj = obj;
        return this;
    }

    @Override
    public void handlerRPCRequest(RemotingTransporter request, Channel channel) {
        providerRPCController.handlerRPCRequest(request, channel);
    }

    public boolean isProviderStateIsHealthy() {
        return providerStateIsHealthy;
    }

    public void setProviderStateIsHealthy(boolean providerStateIsHealthy) {
        this.providerStateIsHealthy = providerStateIsHealthy;
    }

    public NettyClientConfig getClientConfig() {
        return clientConfig;
    }

    public void setClientConfig(NettyClientConfig clientConfig) {
        this.clientConfig = clientConfig;
    }

    public NettyServerConfig getServerConfig() {
        return serverConfig;
    }

    public void setServerConfig(NettyServerConfig serverConfig) {
        this.serverConfig = serverConfig;
    }

    public NettyRemotingClient getNettyRemotingClient() {
        return nettyRemotingClient;
    }

    public void setNettyRemotingClient(NettyRemotingClient nettyRemotingClient) {
        this.nettyRemotingClient = nettyRemotingClient;
    }

    public NettyRemotingServer getNettyRemotingServer() {
        return nettyRemotingServer;
    }

    public void setNettyRemotingServer(NettyRemotingServer nettyRemotingServer) {
        this.nettyRemotingServer = nettyRemotingServer;
    }

    public NettyRemotingServer getNettyRemotingVipServer() {
        return nettyRemotingVipServer;
    }

    public void setNettyRemotingVipServer(NettyRemotingServer nettyRemotingVipServer) {
        this.nettyRemotingVipServer = nettyRemotingVipServer;
    }

    public ProviderRegistryController getProviderRegistryController() {
        return providerRegistryController;
    }

    public void setProviderRegistryController(ProviderRegistryController providerRegistryController) {
        this.providerRegistryController = providerRegistryController;
    }

    public void setPublishRemotingTransporters(List<RemotingTransporter> publishRemotingTransporters) {
        this.publishRemotingTransporters = publishRemotingTransporters;
    }

    public List<RemotingTransporter> getPublishRemotingTransporters() {
        return publishRemotingTransporters;
    }

    public String getRegistryAddress() {
        return registryAddress;
    }

    public void setRegistryAddress(String registryAddress) {
        this.registryAddress = registryAddress;
    }

    public int getExposePort() {
        return exposePort;
    }

    public void setExposePort(int exposePort) {
        this.exposePort = exposePort;
    }

    public String getMonitorAddress() {
        return monitorAddress;
    }

    public void setMonitorAddress(String monitorAddress) {
        this.monitorAddress = monitorAddress;
    }

    public ConcurrentMap<String, PublishServiceCustomBody> getGlobalPublishService() {
        return globalPublishService;
    }

    public void setGlobalPublishService(ConcurrentMap<String, PublishServiceCustomBody> globalPublishService) {
        this.globalPublishService = globalPublishService;
    }

    public ProviderRPCController getProviderRPCController() {
        return providerRPCController;
    }

    public void setProviderRPCController(ProviderRPCController providerRPCController) {
        this.providerRPCController = providerRPCController;
    }
}
